from airflow.models.dag import DAG
from airflow.utils.dates import days_ago
import datetime
import pendulum
from airflow.sensors.external_task_sensor import ExternalTaskSensor

local_tz = pendulum.timezone("Asia/Shanghai")
from dagen.operators import SSHOperator

pt= "{{ execution_date.in_tz('Asia/Shanghai').format('YYYYMMDD') }}"

#调度周期
schedule_interval="0 3 * * *"


dag = DAG(
**{
'dag_id': 'cdp_integration_dag_gen',
'catchup': False,
'start_date': datetime.datetime(2021, month=10, day=21, tzinfo=local_tz),
'schedule_interval': schedule_interval,
'default_args':
    {
        'owner': 'Avris',
        'depends_on_past': False,
        'retries': 0,
        'email': ['guanghu@tesla.com'],
        'email_on_failure': True,
        'email_on_retry': False
    }
}
)

user_baseInfo_gen_task_cmd='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name ads_cdp_user_baseinfo_overwrite --sql-params "--hivevar pt=%s"'%pt
user_baseInfo_gen_task = SSHOperator(
    dag=dag,
    **{
        'task_id': 'user_baseInfo_gen_task',
        'ssh_conn_id': 'eden_yarn_stg',
        'command': user_baseInfo_gen_task_cmd
      }
)
user_label_status_check_task_cmd="/usr/java/jdk1.8.0_181-cloudera/bin/java -jar /opt/bigdata/jar/cdp-tesla-bg-job.jar"
user_label_status_check_task=SSHOperator(
    dag=dag,
    **{
        'task_id': 'user_label_status_check_task',
        'ssh_conn_id': 'eden_yarn_stg',
        'command': user_label_status_check_task_cmd,
        'timeout':5000
      }
)

user_profile_gen_task_cmd='bash -x /mnt/app/eden-dwh/eden-dwh-cdp-tagging/bin/job.sh start --job-name ads_cdp_user_profile_overwrite --sql-params "--hivevar pt=%s"'%pt
user_profile_gen_task = SSHOperator(
    dag=dag,
    **{
        'task_id': 'user_profile_gen_task',
        'ssh_conn_id': 'eden_yarn_stg',
        'command': user_profile_gen_task_cmd
      }
)
user_profile_2ck_task_cmd="/opt/bigdata/spark/current/bin/spark-submit --master yarn  --conf spark.yarn.queue=batch --class com.tesla.cdp.service.WriteClickHouse --deploy-mode client --name test-job --jars /home/hadoop/eden-dwh-cdp-tagging-1.0.jar,/opt/bigdata/jar/fastjson-1.2.78.jar,/opt/bigdata/jar/json4s-native_3-4.0.3.jar,/opt/bigdata/jar/config-1.3.3.jar,/opt/bigdata/jar/jcommander-1.72.jar,/opt/bigdata/jar/scala-logging_2.11-3.7.2.jar,/home/hadoop/eden-dwh-cdp-1.0/lib/spark-avro_2.12-3.0.1.jar,/home/hadoop/eden-dwh-cdp-1.0/lib/hudi-utilities-bundle_2.12-0.9.0.jar,/opt/bigdata/jar/clickhouse-jdbc-0.3.1.jar,/home/hadoop/eden-dwh-cdp-1.0/lib/eden-dwh-cdp-1.0.jar --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension --conf spark.hadoop.mapreduce.input.pathFilter.class=org.apache.hudi.hadoop.HoodieROTablePathFilter --conf spark.hadoop.mapreduce.input.pathFilter.class=org.apache.hadoop.fs.PathFilter /home/hadoop/eden-dwh-cdp-tagging-1.0.jar -job-name ads_cdp_ck_user_profile_insert -pt %s"%pt
user_profile_2ck_task = SSHOperator(
    dag=dag,
    **{
        'task_id': 'user_profile_2ck_task',
        'ssh_conn_id': 'eden_yarn_stg',
        'command': user_profile_2ck_task_cmd
      }
)

task1 = ExternalTaskSensor(
    task_id='child_task1',  # waiting for the whole dag to execute
#     execution_delta=None,  # Same day as today
    external_dag_id='dws_cdp_agg_smp_sm_activity_td_v2_generate',  # here is the id of the dag
    external_task_id='par_task1',  # waiting for the whole dag to execute
    dag=dag,
    timeout=60,
    execution_delta=datetime.timedelta(hours=1),
    allowed_states=['success'],
    failed_states=['failed', 'skipped'],
    check_existence=True
)

task2 = ExternalTaskSensor(
    task_id='child_task2',  # waiting for the whole dag to execute
#     execution_delta=None,  # Same day as today
    external_dag_id='cdp_arrive_store_label_generate',  # here is the id of the dag
    external_task_id='par_task2',  # waiting for the whole dag to execute
    dag=dag,
    timeout=60,
    execution_delta=datetime.timedelta(hours=1),
    allowed_states=['success'],
    failed_states=['failed', 'skipped'],
    check_existence=True
)

task3 = ExternalTaskSensor(
    task_id='child_task3',  # waiting for the whole dag to execute
#     execution_delta=None,  # Same day as today
    external_dag_id='cdp_charge_detail_gen',  # here is the id of the dag
    external_task_id='par_task3',  # waiting for the whole dag to execute
    dag=dag,
    timeout=60,
    execution_delta=datetime.timedelta(hours=1),
    allowed_states=['success'],
    failed_states=['failed', 'skipped'],
    check_existence=True
)

task4 = ExternalTaskSensor(
    task_id='child_task4',  # waiting for the whole dag to execute
#     execution_delta=None,  # Same day as today
    external_dag_id='cdp_purpose_car_generate',  # here is the id of the dag
    external_task_id='par_task4',  # waiting for the whole dag to execute
    dag=dag,
    timeout=60,
    execution_delta=datetime.timedelta(hours=1),
    allowed_states=['success'],
    failed_states=['failed', 'skipped'],
    check_existence=True
)

task5 = ExternalTaskSensor(
    task_id='child_task5',  # waiting for the whole dag to execute
#     execution_delta=None,  # Same day as today
    external_dag_id='cdp_arrive_store_participate_activity_gen',  # here is the id of the dag
    external_task_id='par_task5',  # waiting for the whole dag to execute
    dag=dag,
    timeout=60,
    execution_delta=datetime.timedelta(hours=1),
    allowed_states=['success'],
    failed_states=['failed', 'skipped'],
    check_existence=True
)

[task1, task2, task3,task4,task5] >>user_baseInfo_gen_task>>user_label_status_check_task>>user_profile_gen_task>>user_profile_2ck_task